package com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper;

import com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper.data.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class ZookeeperServiceImpl implements ZookeeperService {

    private CuratorFramework client;

    private ObjectMapper objectMapper;

    public ZookeeperServiceImpl() {
    }

    public ZookeeperServiceImpl(CuratorFramework client, ObjectMapper objectMapper) {
        this.client = client;
        this.objectMapper = objectMapper;
    }

    @Override
    public Controller getController() {
        try {
            byte[] epochBytes = client.getData().forPath(String.format("/controller_epoch"));
            Optional<Controller> optional = getPathData("/controller", Controller.class);
            optional.ifPresent(t -> t.setController_epoch(new String(epochBytes)));
            return optional.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public List<String> getBrokerIds() {
        try {
            return client.getChildren().forPath("/brokers/ids");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ArrayList<>();
    }

    @Override
    public BrokerInfo getBroker(String brokerId) {
        String path = String.format("/brokers/ids/%s", brokerId);
        Optional<BrokerInfo> optional = getPathData(path, BrokerInfo.class);
        optional.ifPresent(t -> t.setId(brokerId));
        return optional.get();
    }

    @Override
    public List<BrokerInfo> getBrokers() {
        return getBrokerIds().stream().map(this::getBroker).collect(Collectors.toList());
    }

    @Override
    public List<String> getTopicNames() {
        try {
            return client.getChildren().forPath("/brokers/topics");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ArrayList<>();
    }

    @Override
    public TopicInfo getTopic(String topic) {
        String path = String.format("/brokers/topics/%s", topic);
        Optional<TopicInfo> optional = getPathData(path, TopicInfo.class);
        optional.ifPresent(t -> t.setTopic(topic));
        return optional.get();
    }

    protected <T> Optional<T> getPathData(String path, Class<T> clazz) {
        try {
            byte[] bytes = client.getData().forPath(path);
            return Optional.ofNullable(this.objectMapper.readValue(new String(bytes), clazz));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Optional.empty();
    }

    @Override
    public List<String> getPartitions(String topic) {
        try {
            return client.getChildren().forPath(String.format("/brokers/topics/%s/partitions", topic));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ArrayList<>();
    }

    @Override
    public TopicPartitionState getTopicPartitionState(String topic, String partition) {
        String path = String.format("/brokers/topics/%s/partitions/%s/state", topic, partition);
        Optional<TopicPartitionState> optional = getPathData(path, TopicPartitionState.class);
        optional.ifPresent(t -> {
            t.setTopic(topic);
            t.setPartition(partition);
        });
        return optional.get();
    }

    @Override
    public List<String> getConsumerGroups() {
        try {
            return client.getChildren().forPath("/consumers");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ArrayList<>();
    }

    @Override
    public List<String> getConsumerIds(String consumerGroup) {
        try {
            return client.getChildren().forPath(String.format("/consumers/%s/ids", consumerGroup));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ArrayList<>();
    }

    @Override
    public ConsumerInfo getConsumer(String consumerGroup, String consumerId) {
        String path = String.format("/consumers/%s/ids/%s", consumerGroup, consumerId);
        Optional<ConsumerInfo> optional = getPathData(path, ConsumerInfo.class);
        optional.ifPresent(t -> {
            t.setConsumerGroup(consumerGroup);
            t.setConsumerId(consumerId);
        });
        return optional.get();
    }

    @Override
    public List<ConsumerInfo> getConsumers(String consumerGroup) {
        return getConsumerIds(consumerGroup).stream()
                .map(consumerId -> this.getConsumer(consumerGroup, consumerId))
                .collect(Collectors.toList());
    }

    public void setClient(CuratorFramework client) {
        this.client = client;
    }

    public void setObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }
}
